package cn.edu360.beans

import java.util.Properties

import org.apache.spark.sql._
import org.apache.spark.{SparkConf, SparkContext}

object dmpread {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("统计日志文件中各省市的数据分布情况")
      // 设置序列化方式， [rdd] [worker]
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")


    val url = "jdbc:mysql://192.168.254.5:3306/test?characterEncoding=utf-8"
    val table = "dmpwrite"
    val props = new Properties()
    props.setProperty("user","root")
    props.setProperty("password","327652")
    props.setProperty("driver","com.mysql.jdbc.Driver")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val parquet: DataFrame = sqlContext.read.parquet("d:/ParquetFile")
    parquet.cache()
    parquet.registerTempTable("t_dmp")
    val result: DataFrame = sqlContext.sql("select count(*) as ct,provincename,cityname " +
      "from t_dmp group by provincename,cityname")
    result.repartition(1).write.mode(SaveMode.Overwrite).json("D:/JsonFile/")
    result.coalesce(1).write.mode(SaveMode.Overwrite).jdbc(url, table, props)

    //val resFinal: DataFrame = result.map(t =>(t._2,t._1._1,t._1._2)).toDF("ct","provincename","cityname")
    //val resFinal: RDD[Pod] = result.map(t =>{Pod(t._2, t._1._1, t._1._2)})
    //json写到本地
    //resFinal.toDF().repartition(1).write.mode(SaveMode.Overwrite).json("D:/JsonFile/")
    //写到jdbc
    //resFinal.toDF().write.mode(SaveMode.Overwrite).jdbc(url, table, props) //如果表不存在会默认创建

    sc.stop()
  }
}
// 定义一个样例类
case class Pod(ct: Int, provincename: String, cityname: String)
